
package org.apache.solr.update;

import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.HttpShardHandlerFactory;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
import org.apache.solr.handler.component.ShardRequest;
import org.apache.solr.handler.component.ShardResponse;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import org.apache.solr.update.processor.RunUpdateProcessorFactory;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** @lucene.experimental */
public class PeerSync {

    public static Logger log = LoggerFactory.getLogger(PeerSync.class);
    public boolean debug = log.isDebugEnabled();
    private List<String> replicas;
    private int nUpdates;
    private int maxUpdates;  // maximum number of updates to request before failing
    private UpdateHandler uhandler;
    private UpdateLog ulog;
    private HttpShardHandlerFactory shardHandlerFactory;
    private ShardHandler shardHandler;
    private UpdateLog.RecentUpdates recentUpdates;
    private List<Long> startingVersions;
    private List<Long> ourUpdates;
    private Set<Long> ourUpdateSet;
    private Set<Long> requestedUpdateSet;
    private long ourLowThreshold;  // 20th percentile
    private long ourHighThreshold; // 80th percentile
    private boolean cantReachIsSuccess;
    private static final HttpClient client;

    static {
        ModifiableSolrParams params = new ModifiableSolrParams();
        params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 20);
        params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
        params.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000);
        params.set(HttpClientUtil.PROP_SO_TIMEOUT, 30000);
        params.set(HttpClientUtil.PROP_USE_RETRY, false);
        client = HttpClientUtil.createClient(params);
    }
    // comparator that sorts by absolute value, putting highest first
    private static Comparator<Long> absComparator = new Comparator<Long>() {

        @Override
        public int compare(Long o1, Long o2) {
            long l1 = Math.abs(o1);
            long l2 = Math.abs(o2);
            if (l1 > l2) {
                return -1;
            }
            if (l1 < l2) {
                return 1;
            }
            return 0;
        }
    };
    // comparator that sorts update records by absolute value of version, putting lowest first
    private static Comparator<Object> updateRecordComparator = new Comparator<Object>() {

        @Override
        public int compare(Object o1, Object o2) {
            if (!(o1 instanceof List)) {
                return 1;
            }
            if (!(o2 instanceof List)) {
                return -1;
            }

            List lst1 = (List) o1;
            List lst2 = (List) o2;

            long l1 = Math.abs((Long) lst1.get(1));
            long l2 = Math.abs((Long) lst2.get(1));
            if (l1 > l2) {
                return 1;
            }
            if (l1 < l2) {
                return -1;
            }
            return 0;
        }
    };

    private static class SyncShardRequest extends ShardRequest {

        List<Long> reportedVersions;
        List<Long> requestedUpdates;
        Exception updateException;
    }

    public PeerSync(SolrCore core, List<String> replicas, int nUpdates) {
        this(core, replicas, nUpdates, false);
    }

    public PeerSync(SolrCore core, List<String> replicas, int nUpdates, boolean cantReachIsSuccess) {

        this.replicas = replicas;
        this.nUpdates = nUpdates;
        this.maxUpdates = nUpdates;
        this.cantReachIsSuccess = cantReachIsSuccess;

        uhandler = core.getUpdateHandler();
        ulog = uhandler.getUpdateLog();
        // TODO: shutdown
        shardHandlerFactory = new HttpShardHandlerFactory();
        shardHandler = shardHandlerFactory.getShardHandler(client);
    }

    // optional list of updates we had before possibly receiving new updates
    public void setStartingVersions(List<Long> startingVersions) {
        this.startingVersions = startingVersions;
    }

    public long percentile(List<Long> arr, float frac) {
        int elem = (int) (arr.size() * frac);
        return Math.abs(arr.get(elem));
    }

    // start of peersync related debug messages.  includes the core name for correlation.
    private String msg() {

        ZkController zkController = uhandler.core.getCoreDescriptor().getCoreContainer().getZkController();

        String myURL = "";

        if (zkController != null) {
            myURL = zkController.getBaseUrl();
        }

        // TODO: core name turns up blank in many tests - find URL if cloud enabled?
        return "PeerSync: core=" + uhandler.core.getName() + " url=" + myURL + " ";
    }

    /**
     * Returns true if peer sync was successful, meaning that this core may not
     * be considered to have the latest updates when considering the last N
     * updates between it and it's peers. A commit is not performed.
     */
    public boolean sync() {
        if (ulog == null) {
            return false;
        }

        log.info(msg() + "START replicas=" + replicas + " nUpdates=" + nUpdates);

        // TODO: does it ever make sense to allow sync when buffering or applying buffered?  Someone might request that we do it...
        if (!(ulog.getState() == UpdateLog.State.ACTIVE || ulog.getState() == UpdateLog.State.REPLAYING)) {
            log.error(msg() + "ERROR, update log not in ACTIVE or REPLAY state. " + ulog);
            // return false;
        }

        if (debug) {
            if (startingVersions != null) {
                log.debug(msg() + "startingVersions=" + startingVersions.size() + " " + startingVersions);
            }
        }

        // Fire off the requests before getting our own recent updates (for better concurrency)
        // This also allows us to avoid getting updates we don't need... if we got our updates and then got their updates, they would
        // have newer stuff that we also had (assuming updates are going on and are being forwarded).
        for (String replica : replicas) {
            requestVersions(replica);
        }

        recentUpdates = ulog.getRecentUpdates();
        try {
            ourUpdates = recentUpdates.getVersions(nUpdates);
        }
        finally {
            recentUpdates.close();
        }

        Collections.sort(ourUpdates, absComparator);

        if (startingVersions != null) {
            if (startingVersions.isEmpty()) {
                // no frame of reference to tell of we've missed updates
                log.warn("no frame of reference to tell of we've missed updates");
                return false;
            }
            Collections.sort(startingVersions, absComparator);

            ourLowThreshold = percentile(startingVersions, 0.8f);
            ourHighThreshold = percentile(startingVersions, 0.2f);

            // now make sure that the starting updates overlap our updates
            // there shouldn't be reorders, so any overlap will do.

            long smallestNewUpdate = Math.abs(ourUpdates.get(ourUpdates.size() - 1));

            if (Math.abs(startingVersions.get(0)) < smallestNewUpdate) {
                log.warn(msg() + "too many updates received since start - startingUpdates no longer overlaps with our currentUpdates");
                return false;
            }

            // let's merge the lists
            List<Long> newList = new ArrayList<>(ourUpdates);
            for (Long ver : startingVersions) {
                if (Math.abs(ver) < smallestNewUpdate) {
                    newList.add(ver);
                }
            }

            ourUpdates = newList;
        }
        else {

            if (ourUpdates.size() > 0) {
                ourLowThreshold = percentile(ourUpdates, 0.8f);
                ourHighThreshold = percentile(ourUpdates, 0.2f);
            }
            else {
                // we have no versions and hence no frame of reference to tell if we can use a peers
                // updates to bring us into sync
                log.info(msg() + "DONE.  We have no versions.  sync failed.");
                return false;
            }
        }

        ourUpdateSet = new HashSet<>(ourUpdates);
        requestedUpdateSet = new HashSet<>(ourUpdates);

        for (;;) {
            ShardResponse srsp = shardHandler.takeCompletedOrError();
            if (srsp == null) {
                break;
            }
            boolean success = handleResponse(srsp);
            if (!success) {
                log.info(msg() + "DONE. sync failed");
                shardHandler.cancelAll();
                return false;
            }
        }

        log.info(msg() + "DONE. sync succeeded");
        return true;
    }

    private void requestVersions(String replica) {

        SyncShardRequest sreq = new SyncShardRequest();
        sreq.purpose = 1;
        // TODO: this sucks
        if (replica.startsWith("http://")) {
            replica = replica.substring(7);
        }
        sreq.shards = new String[]{replica};
        sreq.actualShards = sreq.shards;
        sreq.params = new ModifiableSolrParams();
        sreq.params.set("qt", "/get");
        sreq.params.set("distrib", false);
        sreq.params.set("getVersions", nUpdates);
        shardHandler.submit(sreq, replica, sreq.params);
    }

    private boolean handleResponse(ShardResponse srsp) {

        ShardRequest sreq = srsp.getShardRequest();

        if (srsp.getException() != null) {

            // TODO: look at this more thoroughly - we don't want
            // to fail on connection exceptions, but it may make sense
            // to determine this based on the number of fails
            //
            // If the replica went down between asking for versions and asking for specific updates, that
            // shouldn't be treated as success since we counted on getting those updates back (and avoided
            // redundantly asking other replicas for them).
            if (cantReachIsSuccess && sreq.purpose == 1 && srsp.getException() instanceof SolrServerException) {
                Throwable solrException = ((SolrServerException) srsp.getException())
                        .getRootCause();
                if (solrException instanceof ConnectException || solrException instanceof ConnectTimeoutException
                        || solrException instanceof NoHttpResponseException) {
                    log.warn(msg() + " couldn't connect to " + srsp.getShardAddress() + ", counting as success");

                    return true;
                }
            }

            if (cantReachIsSuccess && sreq.purpose == 1 && srsp.getException() instanceof SolrException && ((SolrException) srsp.getException()).code() == 503) {
                log.warn(msg() + " got a 503 from " + srsp.getShardAddress() + ", counting as success");
                return true;
            }
            // TODO: at least log???
            // srsp.getException().printStackTrace(System.out);

            log.warn(msg() + " exception talking to " + srsp.getShardAddress() + ", failed", srsp.getException());

            return false;
        }

        if (sreq.purpose == 1) {
            return handleVersions(srsp);
        }
        else {
            return handleUpdates(srsp);
        }
    }

    private boolean handleVersions(ShardResponse srsp) {
        // we retrieved the last N updates from the replica
        List<Long> otherVersions = (List<Long>) srsp.getSolrResponse().getResponse().get("versions");
        // TODO: how to handle short lists?

        SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
        sreq.reportedVersions = otherVersions;

        log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0]);

        if (otherVersions.isEmpty()) {
            return true;
        }

        boolean completeList = otherVersions.size() < nUpdates;  // do we have their complete list of updates?

        Collections.sort(otherVersions, absComparator);

        if (debug) {
            log.debug(msg() + " sorted versions from " + sreq.shards[0] + " = " + otherVersions);
        }

        long otherHigh = percentile(otherVersions, .2f);
        long otherLow = percentile(otherVersions, .8f);

        if (ourHighThreshold < otherLow) {
            // Small overlap between version windows and ours is older
            // This means that we might miss updates if we attempted to use this method.
            // Since there exists just one replica that is so much newer, we must
            // fail the sync.
            log.info(msg() + " Our versions are too old. ourHighThreshold=" + ourHighThreshold + " otherLowThreshold=" + otherLow);
            return false;
        }

        if (ourLowThreshold > otherHigh) {
            // Small overlap between windows and ours is newer.
            // Using this list to sync would result in requesting/replaying results we don't need
            // and possibly bringing deleted docs back to life.
            log.info(msg() + " Our versions are newer. ourLowThreshold=" + ourLowThreshold + " otherHigh=" + otherHigh);
            return true;
        }

        List<Long> toRequest = new ArrayList<>();
        for (Long otherVersion : otherVersions) {
            // stop when the entries get old enough that reorders may lead us to see updates we don't need
            if (!completeList && Math.abs(otherVersion) < ourLowThreshold) {
                break;
            }

            if (ourUpdateSet.contains(otherVersion) || requestedUpdateSet.contains(otherVersion)) {
                // we either have this update, or already requested it
                // TODO: what if the shard we previously requested this from returns failure (because it goes
                // down)
                continue;
            }

            toRequest.add(otherVersion);
            requestedUpdateSet.add(otherVersion);
        }

        sreq.requestedUpdates = toRequest;

        if (toRequest.isEmpty()) {
            log.info(msg() + " Our versions are newer. ourLowThreshold=" + ourLowThreshold + " otherHigh=" + otherHigh);

            // we had (or already requested) all the updates referenced by the replica
            return true;
        }

        if (toRequest.size() > maxUpdates) {
            log.info(msg() + " Failing due to needing too many updates:" + maxUpdates);
            return false;
        }

        return requestUpdates(srsp, toRequest);
    }

    private boolean requestUpdates(ShardResponse srsp, List<Long> toRequest) {
        String replica = srsp.getShardRequest().shards[0];

        log.info(msg() + "Requesting updates from " + replica + "n=" + toRequest.size() + " versions=" + toRequest);

        // reuse our original request object
        ShardRequest sreq = srsp.getShardRequest();

        sreq.purpose = 0;
        sreq.params = new ModifiableSolrParams();
        sreq.params.set("qt", "/get");
        sreq.params.set("distrib", false);
        sreq.params.set("getUpdates", StrUtils.join(toRequest, ','));
        sreq.responses.clear();  // needs to be zeroed for correct correlation to occur

        shardHandler.submit(sreq, sreq.shards[0], sreq.params);

        return true;
    }

    private boolean handleUpdates(ShardResponse srsp) {
        // we retrieved the last N updates from the replica
        List<Object> updates = (List<Object>) srsp.getSolrResponse().getResponse().get("updates");

        SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
        if (updates.size() < sreq.requestedUpdates.size()) {
            log.error(msg() + " Requested " + sreq.requestedUpdates.size() + " updates from " + sreq.shards[0] + " but retrieved " + updates.size());
            return false;
        }

        ModifiableSolrParams params = new ModifiableSolrParams();
        params.set(DISTRIB_UPDATE_PARAM, FROMLEADER.toString());
        // params.set("peersync",true); // debugging
        SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
        SolrQueryResponse rsp = new SolrQueryResponse();

        RunUpdateProcessorFactory runFac = new RunUpdateProcessorFactory();
        DistributedUpdateProcessorFactory magicFac = new DistributedUpdateProcessorFactory();
        runFac.init(new NamedList());
        magicFac.init(new NamedList());

        UpdateRequestProcessor proc = magicFac.getInstance(req, rsp, runFac.getInstance(req, rsp, null));

        Collections.sort(updates, updateRecordComparator);

        Object o = null;
        long lastVersion = 0;
        try {
            // Apply oldest updates first
            for (Object obj : updates) {
                // should currently be a List<Oper,Ver,Doc/Id>
                o = obj;
                List<Object> entry = (List<Object>) o;

                if (debug) {
                    log.debug(msg() + "raw update record " + o);
                }

                int oper = (Integer) entry.get(0) & UpdateLog.OPERATION_MASK;
                long version = (Long) entry.get(1);
                if (version == lastVersion && version != 0) {
                    continue;
                }
                lastVersion = version;

                switch (oper) {
                    case UpdateLog.ADD: {
                        // byte[] idBytes = (byte[]) entry.get(2);
                        SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
                        AddUpdateCommand cmd = new AddUpdateCommand(req);
                        // cmd.setIndexedId(new BytesRef(idBytes));
                        cmd.solrDoc = sdoc;
                        cmd.setVersion(version);
                        cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
                        if (debug) {
                            log.debug(msg() + "add " + cmd);
                        }
                        proc.processAdd(cmd);
                        break;
                    }
                    case UpdateLog.DELETE: {
                        byte[] idBytes = (byte[]) entry.get(2);
                        DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
                        cmd.setIndexedId(new BytesRef(idBytes));
                        cmd.setVersion(version);
                        cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
                        if (debug) {
                            log.debug(msg() + "delete " + cmd);
                        }
                        proc.processDelete(cmd);
                        break;
                    }
                    case UpdateLog.DELETE_BY_QUERY: {
                        String query = (String) entry.get(2);
                        DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
                        cmd.query = query;
                        cmd.setVersion(version);
                        cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
                        if (debug) {
                            log.debug(msg() + "deleteByQuery " + cmd);
                        }
                        proc.processDelete(cmd);
                        break;
                    }
                    default:
                        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
                }

            }

        }
        catch (IOException e) {
            // TODO: should this be handled separately as a problem with us?
            // I guess it probably already will by causing replication to be kicked off.
            sreq.updateException = e;
            log.error(msg() + "Error applying updates from " + sreq.shards + " ,update=" + o, e);
            return false;
        }
        catch (Exception e) {
            sreq.updateException = e;
            log.error(msg() + "Error applying updates from " + sreq.shards + " ,update=" + o, e);
            return false;
        }
        finally {
            try {
                proc.finish();
            }
            catch (Exception e) {
                sreq.updateException = e;
                log.error(msg() + "Error applying updates from " + sreq.shards + " ,finish()", e);
                return false;
            }
        }

        return true;
    }

    /**
     * Requests and applies recent updates from peers
     */
    public static void sync(SolrCore core, List<String> replicas, int nUpdates) {

        ShardHandlerFactory shardHandlerFactory = core.getCoreDescriptor().getCoreContainer().getShardHandlerFactory();

        ShardHandler shardHandler = shardHandlerFactory.getShardHandler();

        for (String replica : replicas) {
            ShardRequest sreq = new ShardRequest();
            sreq.shards = new String[]{replica};
            sreq.params = new ModifiableSolrParams();
            sreq.params.set("qt", "/get");
            sreq.params.set("distrib", false);
            sreq.params.set("getVersions", nUpdates);

            shardHandler.submit(sreq, replica, sreq.params);
        }

        for (String replica : replicas) {
            ShardResponse srsp = shardHandler.takeCompletedOrError();
        }

    }
}
